In [1]:
# Copyright 2019 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
In [2]:
# PROJECT_ID is used to construct the docker image registry. We will use Google Container Registry,
# but any other accessible registry works as well.
PROJECT_ID='Your-Gcp-Project-Id'
In [ ]:
# Install Pipeline SDK
!pip3 install kfp --upgrade
!mkdir -p tmp/pipelines
A pipeline is composed of one or more components. In this section, you will build a single component that lists the blobs in a GCS bucket. Then you build a pipeline that consists of this component. There are two ways to author a component. In the following sections we will go through each of them.
The requirements for the component function:
int
, float
, bool
. Everything else is passed as str
, that is, string.typing.NamedTuple
type hint syntax.
In [4]:
def list_blobs(bucket_name: str) -> str:
'''Lists all the blobs in the bucket.'''
import subprocess
subprocess.call(['pip', 'install', '--upgrade', 'google-cloud-storage'])
from google.cloud import storage
storage_client = storage.Client()
bucket = storage_client.get_bucket(bucket_name)
list_blobs_response = bucket.list_blobs()
blobs = ','.join([blob.name for blob in list_blobs_response])
print(blobs)
return blobs
In [5]:
import kfp.components as comp
# Converts the function to a lightweight Python component.
list_blobs_op = comp.func_to_container_op(list_blobs)
Note that when accessing google cloud file system, you need to make sure the pipeline can authenticate to GCP. Refer to Authenticating Pipelines to GCP for details.
In [7]:
import kfp.dsl as dsl
# Defines the pipeline.
@dsl.pipeline(name='List GCS blobs', description='Lists GCS blobs.')
def pipeline_func(bucket_name):
list_blobs_task = list_blobs_op(bucket_name)
# Use the following commented code instead if you want to use GSA key for authentication.
#
# from kfp.gcp import use_gcp_secret
# list_blobs_task = list_blobs_op(bucket_name).apply(use_gcp_secret('user-gcp-sa'))
# Same for below.
# Compile the pipeline to a file.
import kfp.compiler as compiler
compiler.Compiler().compile(pipeline_func, 'tmp/pipelines/list_blobs.pipeline.tar.gz')
Create your own container image that includes your program. If your component creates some outputs to be fed as inputs to the downstream components, each separate output must be written as a string to a separate local text file inside the container image. For example, if a trainer component needs to output the trained model path, it can write the path to a local file /output.txt
. The string written to an output file cannot be too big. If it is too big (>> 100 kB), it is recommended to save the output to an external persistent storage and pass the storage path to the next component.
Start by entering the value of your Google Cloud Platform Project ID.
The following cell creates a file app.py
that contains a Python script. The script takes a GCS bucket name as an input argument, gets the lists of blobs in that bucket, prints the list of blobs and also writes them to an output file.
In [8]:
%%bash
# Create folders if they don't exist.
mkdir -p tmp/components/list-gcs-blobs
# Create the Python file that lists GCS blobs.
cat > ./tmp/components/list-gcs-blobs/app.py <<HERE
import argparse
from google.cloud import storage
# Parse agruments.
parser = argparse.ArgumentParser()
parser.add_argument(
'--bucket', type=str, required=True, help='GCS bucket name.')
args = parser.parse_args()
# Create a client.
storage_client = storage.Client()
# List blobs.
bucket = storage_client.get_bucket(args.bucket)
list_blobs_response = bucket.list_blobs()
blobs = ','.join([blob.name for blob in list_blobs_response])
print(blobs)
with open('/blobs.txt', 'w') as f:
f.write(blobs)
HERE
Now create a container that runs the script. Start by creating a Dockerfile
. A Dockerfile
contains the instructions to assemble a Docker image. The FROM
statement specifies the Base Image from which you are building. WORKDIR
sets the working directory. When you assemble the Docker image, COPY
will copy the required files and directories (for example, app.py
) to the filesystem of the container. RUN
will execute a command (for example, install the dependencies) and commits the results.
In [9]:
%%bash
# Create Dockerfile.
cat > ./tmp/components/list-gcs-blobs/Dockerfile <<EOF
FROM python:3.6-slim
WORKDIR /app
COPY . /app
RUN pip install --upgrade google-cloud-storage
EOF
Now that we have created our Dockerfile we can create our Docker image. Then we need to push the image to a registry to host the image. Now create a Shell script that builds a container image and stores it in the Google Container Registry.
In [10]:
%%bash -s "{PROJECT_ID}"
IMAGE_NAME="listgcsblobs"
TAG="latest" # "v_$(date +%Y%m%d_%H%M%S)"
# Create script to build docker image and push it.
cat > ./tmp/components/list-gcs-blobs/build_image.sh <<HERE
PROJECT_ID="${1}"
IMAGE_NAME="${IMAGE_NAME}"
TAG="${TAG}"
GCR_IMAGE="gcr.io/\${PROJECT_ID}/\${IMAGE_NAME}:\${TAG}"
docker build -t \${IMAGE_NAME} .
docker tag \${IMAGE_NAME} \${GCR_IMAGE}
docker push \${GCR_IMAGE}
docker image rm \${IMAGE_NAME}
docker image rm \${GCR_IMAGE}
HERE
Run the script.
In [ ]:
%%bash
# Build and push the image.
cd tmp/components/list-gcs-blobs
bash build_image.sh
Define a component by creating an instance of kfp.dsl.ContainerOp
that describes the interactions with the Docker container image created in the previous step. You need to specify the component name, the image to use, the command to run after the container starts, the input arguments, and the file outputs. .
In [12]:
import kfp.dsl
def list_gcs_blobs_op(name, bucket):
return kfp.dsl.ContainerOp(
name=name,
image='gcr.io/{}/listgcsblobs:latest'.format(PROJECT_ID),
command=['python', '/app/app.py'],
file_outputs={'blobs': '/blobs.txt'},
arguments=['--bucket', bucket]
)
In [13]:
# Create folders if they don't exist.
!mkdir -p tmp/pipelines
Define your pipeline as a Python function. @kfp.dsl.pipeline
is a required decoration including name
and description
properties. Then compile the pipeline function. After the compilation is completed, a pipeline file is created.
In [14]:
import datetime
import kfp.compiler as compiler
# Define the pipeline
@kfp.dsl.pipeline(
name='List GCS Blobs',
description='Takes a GCS bucket name as input and lists the blobs.'
)
def pipeline_func(bucket='Enter your bucket name here.'):
list_blobs_task = list_gcs_blobs_op('List', bucket)
# Compile the pipeline to a file.
filename = 'tmp/pipelines/list_blobs{dt:%Y%m%d_%H%M%S}.pipeline.tar.gz'.format(
dt=datetime.datetime.now())
compiler.Compiler().compile(pipeline_func, filename)
Follow the instructions on kubeflow.org to access Kubeflow UIs. Upload the created pipeline and run it.
Warning: When the pipeline is run, it pulls the image from the repository to the Kubernetes cluster to create a container. Kubernetes caches pulled images. One solution is to use the image digest instead of the tag in your component dsl, for example, s/v1/sha256:9509182e27dcba6d6903fccf444dc6188709cc094a018d5dd4211573597485c9/g
. Alternatively, if you don't want to update the digest every time, you can try :latest
tag, which will force the k8s to always pull the latest image..
In this section, you will build another component. Then you will see how to connect components to build a multi-component pipeline. You will build the new component by building a Docker container image and wrapping it using ContainerOp
.
Build a component that can the output of the first component explained in the preceding section (that is, the list of GCS blobs), selects a file ending in iris.csv
and displays its content as an artifact. Start by uploading to your Storage bucket the quickstart_iris.csv
file that is included in the repository.
In [ ]:
%%bash -s "{PROJECT_ID}"
# Create folders if they don't exist.
mkdir -p tmp/components/view-input
# Create the Python file that selects and views the input CSV.
cat > ./tmp/components/view-input/app.py <<HERE
import argparse
import json
from google.cloud import storage
# Parse agruments.
parser = argparse.ArgumentParser()
parser.add_argument('--blobs', type=str, required=True, help='List of blobs.')
args = parser.parse_args()
blobs = args.blobs.split(',')
inputs = filter(lambda s: s.endswith('iris.csv'), blobs)
input = list(inputs)[0]
print('The CSV file is {}'.format(input))
# CSV header.
header = [
'sepal_length',
'sepal_width',
'petal_length',
'petal_width',
'species',
]
# Add a metadata for an artifact.
metadata = {
'outputs' : [{
'type': 'table',
'storage': 'gcs',
'format': 'csv',
'header': header,
'source': input
}]
}
print(metadata)
# Create an artifact.
with open('/mlpipeline-ui-metadata.json', 'w') as f:
json.dump(metadata, f)
HERE
# Create Dockerfile.
cat > ./tmp/components/view-input/Dockerfile <<HERE
FROM python:3.6-slim
WORKDIR /app
COPY . /app
RUN pip install --upgrade google-cloud-storage
HERE
# Create script to build docker image and push it.
IMAGE_NAME="viewinput"
TAG="latest" # "v_$(date +%Y%m%d_%H%M%S)"
cat > ./tmp/components/view-input/build_image.sh <<HERE
PROJECT_ID="${1}"
IMAGE_NAME="${IMAGE_NAME}"
TAG="${TAG}"
GCR_IMAGE="gcr.io/\${PROJECT_ID}/\${IMAGE_NAME}:\${TAG}"
docker build -t \${IMAGE_NAME} .
docker tag \${IMAGE_NAME} \${GCR_IMAGE}
docker push \${GCR_IMAGE}
docker image rm \${IMAGE_NAME}
docker image rm \${GCR_IMAGE}
HERE
# Build and push the image.
cd tmp/components/view-input
bash build_image.sh
Define each of your components by using kfp.dsl.ContainerOp
. Decribe the interactions with the Docker container image created in the previous step by specifying the component name, the image to use, the command to run after the container starts, the input arguments, and the file outputs.
In [16]:
import kfp.dsl
def list_gcs_blobs_op(name, bucket):
return kfp.dsl.ContainerOp(
name=name,
image='gcr.io/{}/listgcsblobs:latest'.format(PROJECT_ID),
command=['python', '/app/app.py'],
arguments=['--bucket', bucket],
file_outputs={'blobs': '/blobs.txt'},
output_artifact_paths={'mlpipeline-ui-metadata': '/mlpipeline-ui-metadata.json'},
)
def view_input_op(name, blobs):
return kfp.dsl.ContainerOp(
name=name,
image='gcr.io/{}/viewinput:latest'.format(PROJECT_ID),
command=['python', '/app/app.py'],
arguments=['--blobs', blobs]
)
Define your pipeline as a Python function. @kfp.dsl.pipeline
is a required decoration including name
and description
properties. pipeline_func
defines the pipeline with the bucket
parameter. When the user uploads the pipeline to the system and starts creating a new run from it, they'll see the an input box for the bucket
parameter with the initial value Enter your bucket name here.
. You can change the initial value with your bucket name at runtime. list_gcs_blobs_op('List', bucket)
will create a component named List
that lists the blobs. view_input_op('View', list_blobs_task.outputs['blobs'])
will create a component named View
that views a CSV. list_blobs_task.outputs['blobs']
tells the pipeline to take the output of the first component stored as string in blobs.txt
as an input for the second component.
In [17]:
# Create folders if they don't exist.
!mkdir -p tmp/pipelines
In [18]:
import datetime
import kfp.compiler as compiler
# Define the pipeline
@kfp.dsl.pipeline(
name='Quickstart pipeline',
description='Takes a GCS bucket name views a CSV input file in the bucket.'
)
def pipeline_func(bucket='Enter your bucket name here.'):
list_blobs_task = list_gcs_blobs_op('List', bucket)
view_input_task = view_input_op('View', list_blobs_task.outputs['blobs'])
# Compile the pipeline to a file.
filename = 'tmp/pipelines/quickstart_pipeline{dt:%Y%m%d_%H%M%S}.pipeline.tar.gz'.format(
dt=datetime.datetime.now())
compiler.Compiler().compile(pipeline_func, filename)
Follow the instructions on kubeflow.org to access Kubeflow UIs. Upload the created pipeline and run it.
In [19]:
import shutil
import pathlib
path = pathlib.Path("tmp")
shutil.rmtree(path)